查看原文
其他

动手造轮子:实现一个简单的 EventBus

The following article is from amazingdotnet Author WeihanLi


动手造轮子:实现一个简单的 EventBus

Intro

EventBus 是一种事件发布订阅模式,通过 EventBus 我们可以很方便的实现解耦,将事件的发起和事件的处理的很好的分隔开来,很好的实现解耦。微软官方的示例项目 EShopOnContainers 也有在使用 EventBus 。

这里的 EventBus 实现也是参考借鉴了微软 eShopOnContainers 项目。

EventBus 处理流程:

微服务间使用 EventBus 实现系统间解耦:

借助 EventBus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。

起初觉得 EventBus 和 MQ 其实差不多嘛,都是通过异步处理来实现解耦合,高性能。后来看到了下面这张图才算明白为什么要用 EventBus 以及 EventBus 和 MQ 之间的关系,EventBus 是抽象的,可以用 MQ 来实现 EventBus。

为什么要使用 EventBus

  1. 解耦合(轻松的实现系统间解耦)

  2. 高性能可扩展(每一个事件都是简单独立且不可更改的对象,只需要保存新增的事件,不涉及其他的变更删除操作)

  3. 系统审计(每一个事件都是不可变更的,每一个事件都是可追溯的)

  4. ...

EventBus 整体架构:

  • IEventBase :所有的事件应该实现这个接口,这个接口定义了事件的唯一id EventId 和事件发生的事件 EventAt

  • IEventHandler:定义了一个 Handle 方法来处理相应的事件

  • IEventStore:所有的事件的处理存储,保存事件的 IEventHandler,一般不会直接操作,通过 EventBus 的订阅和取消订阅来操作 EventStore

  • IEventBus:用来发布/订阅/取消订阅事件,并将事件的某一个 IEventHandler 保存到 EventStore 或从 EventStore 中移除

使用示例

来看一个使用示例,完整代码示例:

  1. internal class EventTest

  2. {

  3. public static void MainTest()

  4. {

  5. var eventBus = DependencyResolver.Current.ResolveService<IEventBus>();

  6. eventBus.Subscribe<CounterEvent, CounterEventHandler1>();

  7. eventBus.Subscribe<CounterEvent, CounterEventHandler2>();

  8. eventBus.Subscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();

  9. eventBus.Publish(new CounterEvent { Counter = 1 });


  10. eventBus.Unsubscribe<CounterEvent, CounterEventHandler1>();

  11. eventBus.Unsubscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();

  12. eventBus.Publish(new CounterEvent { Counter = 2 });

  13. }

  14. }


  15. internal class CounterEvent : EventBase

  16. {

  17. public int Counter { get; set; }

  18. }


  19. internal class CounterEventHandler1 : IEventHandler<CounterEvent>

  20. {

  21. public Task Handle(CounterEvent @event)

  22. {

  23. LogHelper.GetLogger<CounterEventHandler1>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");

  24. return Task.CompletedTask;

  25. }

  26. }


  27. internal class CounterEventHandler2 : IEventHandler<CounterEvent>

  28. {

  29. public Task Handle(CounterEvent @event)

  30. {

  31. LogHelper.GetLogger<CounterEventHandler2>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");

  32. return Task.CompletedTask;

  33. }

  34. }

具体实现

EventStoreInMemory 实现:

EventStoreInMemory 是 IEventStore 将数据放在内存中的实现,使用了 ConcurrentDictionary 以及 HashSet 来尽可能的保证高效,具体实现代码如下:

  1. public class EventStoreInMemory : IEventStore

  2. {

  3. private readonly ConcurrentDictionary<string, HashSet<Type>> _eventHandlers = new ConcurrentDictionary<string, HashSet<Type>>();


  4. public bool AddSubscription<TEvent, TEventHandler>()

  5. where TEvent : IEventBase

  6. where TEventHandler : IEventHandler<TEvent>

  7. {

  8. var eventKey = GetEventKey<TEvent>();

  9. if (_eventHandlers.ContainsKey(eventKey))

  10. {

  11. return _eventHandlers[eventKey].Add(typeof(TEventHandler));

  12. }

  13. else

  14. {

  15. return _eventHandlers.TryAdd(eventKey, new HashSet<Type>()

  16. {

  17. typeof(TEventHandler)

  18. });

  19. }

  20. }


  21. public bool Clear()

  22. {

  23. _eventHandlers.Clear();

  24. return true;

  25. }


  26. public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase

  27. {

  28. if(_eventHandlers.Count == 0)

  29. return new Type[0];

  30. var eventKey = GetEventKey<TEvent>();

  31. if (_eventHandlers.TryGetValue(eventKey, out var handlers))

  32. {

  33. return handlers;

  34. }

  35. return new Type[0];

  36. }


  37. public string GetEventKey<TEvent>()

  38. {

  39. return typeof(TEvent).FullName;

  40. }


  41. public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase

  42. {

  43. if(_eventHandlers.Count == 0)

  44. return false;


  45. var eventKey = GetEventKey<TEvent>();

  46. return _eventHandlers.ContainsKey(eventKey);

  47. }


  48. public bool RemoveSubscription<TEvent, TEventHandler>()

  49. where TEvent : IEventBase

  50. where TEventHandler : IEventHandler<TEvent>

  51. {

  52. if(_eventHandlers.Count == 0)

  53. return false;


  54. var eventKey = GetEventKey<TEvent>();

  55. if (_eventHandlers.ContainsKey(eventKey))

  56. {

  57. return _eventHandlers[eventKey].Remove(typeof(TEventHandler));

  58. }

  59. return false;

  60. }

  61. }

EventBusInMemory 的实现,从上面可以看到 EventStore 保存的是 IEventHandler 对应的 Type,在 Publish 的时候根据 Type 从 IoC 容器中取得相应的 Handler 即可,如果没有在 IoC 容器中找到对应的类型,则会尝试创建一个类型实例,然后调用 IEventHandlerHandle 方法,代码如下:

  1. /// <summary>

  2. /// EventBus in memory

  3. /// </summary>

  4. public class EventBus : IEventBus

  5. {

  6. private static readonly ILogHelperLogger Logger = Helpers.LogHelper.GetLogger<EventBus>();


  7. private readonly IEventStore _eventStore;

  8. private readonly IServiceProvider _serviceProvider;


  9. public EventBus(IEventStore eventStore, IServiceProvider serviceProvider = null)

  10. {

  11. _eventStore = eventStore;

  12. _serviceProvider = serviceProvider ?? DependencyResolver.Current;

  13. }


  14. public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase

  15. {

  16. if (!_eventStore.HasSubscriptionsForEvent<TEvent>())

  17. {

  18. return false;

  19. }

  20. var handlers = _eventStore.GetEventHandlerTypes<TEvent>();

  21. if (handlers.Count > 0)

  22. {

  23. var handlerTasks = new List<Task>();

  24. foreach (var handlerType in handlers)

  25. {

  26. try

  27. {

  28. if (_serviceProvider.GetServiceOrCreateInstance(handlerType) is IEventHandler<TEvent> handler)

  29. {

  30. handlerTasks.Add(handler.Handle(@event));

  31. }

  32. }

  33. catch (Exception ex)

  34. {

  35. Logger.Error(ex, $"handle event [{_eventStore.GetEventKey<TEvent>()}] error, eventHandlerType:{handlerType.FullName}");

  36. }

  37. }

  38. handlerTasks.WhenAll().ConfigureAwait(false);


  39. return true;

  40. }

  41. return false;

  42. }


  43. public bool Subscribe<TEvent, TEventHandler>()

  44. where TEvent : IEventBase

  45. where TEventHandler : IEventHandler<TEvent>

  46. {

  47. return _eventStore.AddSubscription<TEvent, TEventHandler>();

  48. }


  49. public bool Unsubscribe<TEvent, TEventHandler>()

  50. where TEvent : IEventBase

  51. where TEventHandler : IEventHandler<TEvent>

  52. {

  53. return _eventStore.RemoveSubscription<TEvent, TEventHandler>();

  54. }

  55. }

项目实例

来看一个实际的项目中的使用,在我的活动室预约项目中有一个公告的模块,访问公告详情页面,这个公告的访问次数加1,把这个访问次数加1改成了用 EventBus 来实现,实际项目代码:https://github.com/WeihanLi/ActivityReservation/blob/67e2cb8e92876629a7af6dc051745dd8c7e9faeb/ActivityReservation/Startup.cs

0. 定义 Event 以及 EventHandler


  1. public class NoticeViewEvent : EventBase

  2. {

  3. public Guid NoticeId { get; set; }


  4. // UserId

  5. // IP

  6. // ...

  7. }


  8. public class NoticeViewEventHandler : IEventHandler<NoticeViewEvent>

  9. {

  10. public async Task Handle(NoticeViewEvent @event)

  11. {

  12. await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext =>

  13. {

  14. var notice = await dbContext.Notices.FindAsync(@event.NoticeId);

  15. notice.NoticeVisitCount += 1;

  16. await dbContext.SaveChangesAsync();

  17. });

  18. }

  19. }

这里的 Event 只定义了一个 NoticeId ,其实也可以把请求信息如IP/UA等信息加进去,在 EventHandler里处理以便日后数据分析。

1. 册 EventBus 相关服务以及 EventHandlers

  1. services.AddSingleton<IEventBus, EventBus>();

  2. services.AddSingleton<IEventStore, EventStoreInMemory>();

  3. //register EventHandlers

  4. services.AddSingleton<NoticeViewEventHandler>();


2. 订阅事件


  1. public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IEventBus eventBus)

  2. {

  3. eventBus.Subscribe<NoticeViewEvent, NoticeViewEventHandler>();

  4. // ...

  5. }

3. 发布事件


  1. eventBus.Publish(new NoticeViewEvent { NoticeId = notice.NoticeId });

Reference

  • https://github.com/dotnet-architecture/eShopOnContainers

  • https://docs.microsoft.com/zh-cn/previous-versions/msp-n-p/jj591559(v=pandp.10)

  • https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/integration-event-based-microservice-communications

  • https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/subscribe-events

  • https://github.com/sheng-jie/EventBus

  • https://github.com/WeihanLi/ActivityReservation

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存